diff --git a/pkg/controller/core/clusterqueue_controller_test.go b/pkg/controller/core/clusterqueue_controller_test.go index e13d149d03..6795394203 100644 --- a/pkg/controller/core/clusterqueue_controller_test.go +++ b/pkg/controller/core/clusterqueue_controller_test.go @@ -229,7 +229,9 @@ func TestUpdateCqStatusIfChanged(t *testing.T) { qManager: qManager, } if tc.newWl != nil { - r.qManager.AddOrUpdateWorkload(tc.newWl) + if err := r.qManager.AddOrUpdateWorkload(tc.newWl); err != nil { + t.Fatalf("Failed to add or update workload : %v", err) + } } gotError := r.updateCqStatusIfChanged(ctx, cq, tc.newConditionStatus, tc.newReason, tc.newMessage) if diff := cmp.Diff(tc.wantError, gotError, cmpopts.EquateErrors()); len(diff) != 0 { diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index a6e8c98fe5..ba00452979 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -599,8 +599,9 @@ func (r *WorkloadReconciler) Create(e event.CreateEvent) bool { workload.AdjustResources(ctx, r.client, wlCopy) if !workload.HasQuotaReservation(wl) { - if !r.queues.AddOrUpdateWorkload(wlCopy) { - log.V(2).Info("LocalQueue for workload didn't exist or not active; ignored for now") + err := r.queues.AddOrUpdateWorkload(wlCopy) + if err != nil { + log.V(2).Info(fmt.Sprintf("%s; ignored for now", err)) } return true } @@ -703,10 +704,10 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { }) case prevStatus == workload.StatusPending && status == workload.StatusPending: - if !r.queues.UpdateWorkload(oldWl, wlCopy) { - log.V(2).Info("Queue for updated workload didn't exist; ignoring for now") + err := r.queues.UpdateWorkload(oldWl, wlCopy) + if err != nil { + log.V(2).Info(fmt.Sprintf("%s; ignored for now", err)) } - case prevStatus == workload.StatusPending && (status == workload.StatusQuotaReserved || status == workload.StatusAdmitted): r.queues.DeleteWorkload(oldWl) if !r.cache.AddOrUpdateWorkload(wlCopy) { @@ -729,8 +730,9 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { // Here we don't take the lock as it is already taken by the wrapping // function. if immediate { - if !r.queues.AddOrUpdateWorkloadWithoutLock(wlCopy) { - log.V(2).Info("LocalQueue for workload didn't exist or not active; ignored for now") + err := r.queues.AddOrUpdateWorkloadWithoutLock(wlCopy) + if err != nil { + log.V(2).Info(fmt.Sprintf("%s; ignored for now", err)) } } }) @@ -741,8 +743,9 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { updatedWl := kueue.Workload{} err := r.client.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWl) if err == nil && workload.Status(&updatedWl) == workload.StatusPending { - if !r.queues.AddOrUpdateWorkload(wlCopy) { - log.V(2).Info("LocalQueue for workload didn't exist or not active; ignored for now") + err := r.queues.AddOrUpdateWorkload(wlCopy) + if err != nil { + log.V(2).Info(fmt.Sprintf("%s; ignored for now", err)) } else { log.V(3).Info("Workload requeued after backoff") } @@ -886,8 +889,9 @@ func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, _ log := log.WithValues("workload", klog.KObj(wlCopy)) log.V(5).Info("Queue reconcile for") workload.AdjustResources(ctrl.LoggerInto(ctx, log), h.r.client, wlCopy) - if !h.r.queues.AddOrUpdateWorkload(wlCopy) { - log.V(2).Info("Queue for workload didn't exist") + err := h.r.queues.AddOrUpdateWorkload(wlCopy) + if err != nil { + log.V(2).Info(err.Error()) } } } diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go index dcc5f89ba3..c0c78b9900 100644 --- a/pkg/queue/manager.go +++ b/pkg/queue/manager.go @@ -319,28 +319,28 @@ func (m *Manager) ClusterQueueForWorkload(wl *kueue.Workload) (string, bool) { // AddOrUpdateWorkload adds or updates workload to the corresponding queue. // Returns whether the queue existed. -func (m *Manager) AddOrUpdateWorkload(w *kueue.Workload) bool { +func (m *Manager) AddOrUpdateWorkload(w *kueue.Workload) error { m.Lock() defer m.Unlock() return m.AddOrUpdateWorkloadWithoutLock(w) } -func (m *Manager) AddOrUpdateWorkloadWithoutLock(w *kueue.Workload) bool { +func (m *Manager) AddOrUpdateWorkloadWithoutLock(w *kueue.Workload) error { qKey := workload.QueueKey(w) q := m.localQueues[qKey] if q == nil { - return false + return ErrQueueDoesNotExist } wInfo := workload.NewInfo(w, m.workloadInfoOptions...) q.AddOrUpdate(wInfo) cq := m.hm.ClusterQueues[q.ClusterQueue] if cq == nil { - return false + return ErrClusterQueueDoesNotExist } cq.PushOrUpdate(wInfo) m.reportPendingWorkloads(q.ClusterQueue, cq) m.Broadcast() - return true + return nil } // RequeueWorkload requeues the workload ensuring that the queue and the @@ -503,7 +503,7 @@ func requeueWorkloadsCohortSubtree(ctx context.Context, m *Manager, cohort *coho // UpdateWorkload updates the workload to the corresponding queue or adds it if // it didn't exist. Returns whether the queue existed. -func (m *Manager) UpdateWorkload(oldW, w *kueue.Workload) bool { +func (m *Manager) UpdateWorkload(oldW, w *kueue.Workload) error { m.Lock() defer m.Unlock() if oldW.Spec.QueueName != w.Spec.QueueName { diff --git a/pkg/queue/manager_test.go b/pkg/queue/manager_test.go index 8c7c186c55..5bad9654e5 100644 --- a/pkg/queue/manager_test.go +++ b/pkg/queue/manager_test.go @@ -346,7 +346,9 @@ func TestUpdateLocalQueue(t *testing.T) { } } for _, w := range workloads { - manager.AddOrUpdateWorkload(w) + if err := manager.AddOrUpdateWorkload(w); err != nil { + t.Errorf("Failed to add or update workload: %v", err) + } } // Update cluster queue of first queue. @@ -417,8 +419,8 @@ func TestAddWorkload(t *testing.T) { } } cases := []struct { - workload *kueue.Workload - wantAdded bool + workload *kueue.Workload + wantErr string }{ { workload: &kueue.Workload{ @@ -428,7 +430,7 @@ func TestAddWorkload(t *testing.T) { }, Spec: kueue.WorkloadSpec{QueueName: "foo"}, }, - wantAdded: true, + wantErr: "", }, { workload: &kueue.Workload{ @@ -438,6 +440,7 @@ func TestAddWorkload(t *testing.T) { }, Spec: kueue.WorkloadSpec{QueueName: "baz"}, }, + wantErr: ErrQueueDoesNotExist.Error(), }, { workload: &kueue.Workload{ @@ -447,6 +450,7 @@ func TestAddWorkload(t *testing.T) { }, Spec: kueue.WorkloadSpec{QueueName: "bar"}, }, + wantErr: ErrClusterQueueDoesNotExist.Error(), }, { workload: &kueue.Workload{ @@ -456,12 +460,14 @@ func TestAddWorkload(t *testing.T) { }, Spec: kueue.WorkloadSpec{QueueName: "foo"}, }, + wantErr: ErrQueueDoesNotExist.Error(), }, } for _, tc := range cases { t.Run(tc.workload.Name, func(t *testing.T) { - if added := manager.AddOrUpdateWorkload(tc.workload); added != tc.wantAdded { - t.Errorf("AddWorkload returned %t, want %t", added, tc.wantAdded) + err := manager.AddOrUpdateWorkload(tc.workload) + if err != nil && err.Error() != tc.wantErr { + t.Fatalf("AddWorkload returned %v, want %v", err, tc.wantErr) } }) } @@ -527,7 +533,7 @@ func TestStatus(t *testing.T) { } } for _, wl := range workloads { - manager.AddOrUpdateWorkload(&wl) + _ = manager.AddOrUpdateWorkload(&wl) } cases := map[string]struct { @@ -671,6 +677,7 @@ func TestUpdateWorkload(t *testing.T) { wantUpdated bool wantQueueOrder map[string][]string wantQueueMembers map[string]sets.Set[string] + wantErr error }{ "in queue": { clusterQueues: []*kueue.ClusterQueue{ @@ -761,6 +768,7 @@ func TestUpdateWorkload(t *testing.T) { wantQueueMembers: map[string]sets.Set[string]{ "/foo": nil, }, + wantErr: ErrQueueDoesNotExist, }, "from non existing queue": { clusterQueues: []*kueue.ClusterQueue{ @@ -799,12 +807,13 @@ func TestUpdateWorkload(t *testing.T) { } } for _, w := range tc.workloads { - manager.AddOrUpdateWorkload(w) + _ = manager.AddOrUpdateWorkload(w) } wl := tc.workloads[0].DeepCopy() tc.update(wl) - if updated := manager.UpdateWorkload(tc.workloads[0], wl); updated != tc.wantUpdated { - t.Errorf("UpdatedWorkload returned %t, want %t", updated, tc.wantUpdated) + err := manager.UpdateWorkload(tc.workloads[0], wl) + if (err != nil) != (tc.wantErr != nil) { + t.Errorf("UpdatedWorkload returned %t, want %t", err, tc.wantErr) } q := manager.localQueues[workload.QueueKey(wl)] if q != nil { @@ -916,7 +925,9 @@ func TestHeads(t *testing.T) { go manager.CleanUpOnContext(ctx) for _, wl := range tc.workloads { - manager.AddOrUpdateWorkload(wl) + if err := manager.AddOrUpdateWorkload(wl); err != nil { + t.Errorf("Failed to add or update workload: %v", err) + } } wlNames := sets.New[string]() @@ -971,13 +982,15 @@ func TestHeadsAsync(t *testing.T) { "AddClusterQueue": { initialObjs: []client.Object{&wl, &queues[0]}, op: func(ctx context.Context, mgr *Manager) { + if err := mgr.AddClusterQueue(ctx, clusterQueues[0]); err != nil { + t.Errorf("Failed adding clusterQueue: %v", err) + } if err := mgr.AddLocalQueue(ctx, &queues[0]); err != nil { t.Errorf("Failed adding queue: %s", err) } - mgr.AddOrUpdateWorkload(&wl) go func() { - if err := mgr.AddClusterQueue(ctx, clusterQueues[0]); err != nil { - t.Errorf("Failed adding clusterQueue: %v", err) + if err := mgr.AddOrUpdateWorkload(&wl); err != nil { + t.Errorf("Failed to add or update workload: %v", err) } }() }, @@ -1016,7 +1029,9 @@ func TestHeadsAsync(t *testing.T) { t.Errorf("Failed adding queue: %s", err) } go func() { - mgr.AddOrUpdateWorkload(&wl) + if err := mgr.AddOrUpdateWorkload(&wl); err != nil { + t.Errorf("Failed to add or update workload: %v", err) + } }() }, wantHeads: []workload.Info{ @@ -1037,7 +1052,9 @@ func TestHeadsAsync(t *testing.T) { go func() { wlCopy := wl.DeepCopy() wlCopy.ResourceVersion = "old" - mgr.UpdateWorkload(wlCopy, &wl) + if err := mgr.UpdateWorkload(wlCopy, &wl); err != nil { + t.Errorf("Failed to add or update workload: %v", err) + } }() }, wantHeads: []workload.Info{ @@ -1217,7 +1234,9 @@ func TestGetPendingWorkloadsInfo(t *testing.T) { } } for _, w := range workloads { - manager.AddOrUpdateWorkload(w) + if err := manager.AddOrUpdateWorkload(w); err != nil { + t.Errorf("Failed to add or update workload: %v", err) + } } cases := map[string]struct { diff --git a/pkg/visibility/api/v1beta1/pending_workloads_cq_test.go b/pkg/visibility/api/v1beta1/pending_workloads_cq_test.go index 6128f65fc5..8f20de38b2 100644 --- a/pkg/visibility/api/v1beta1/pending_workloads_cq_test.go +++ b/pkg/visibility/api/v1beta1/pending_workloads_cq_test.go @@ -338,7 +338,9 @@ func TestPendingWorkloadsInCQ(t *testing.T) { } } for _, w := range tc.workloads { - manager.AddOrUpdateWorkload(w) + if err := manager.AddOrUpdateWorkload(w); err != nil { + t.Fatalf("Failed to add or update workload %q: %v", w.Name, err) + } } info, err := pendingWorkloadsInCqRest.Get(ctx, tc.req.queueName, tc.req.queryParams) diff --git a/pkg/visibility/api/v1beta1/pending_workloads_lq_test.go b/pkg/visibility/api/v1beta1/pending_workloads_lq_test.go index 54c2bdadb0..915dd05725 100644 --- a/pkg/visibility/api/v1beta1/pending_workloads_lq_test.go +++ b/pkg/visibility/api/v1beta1/pending_workloads_lq_test.go @@ -454,7 +454,9 @@ func TestPendingWorkloadsInLQ(t *testing.T) { } } for _, w := range tc.workloads { - manager.AddOrUpdateWorkload(w) + if err := manager.AddOrUpdateWorkload(w); err != nil { + t.Fatalf("Failed to add or update workload :%v", err) + } } ctx = request.WithNamespace(ctx, tc.req.nsName)